/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SHARE
#include "ob_index_builder_util.h"
#include "ob_fts_index_builder_util.h"
#include "ob_vec_index_builder_util.h"

#include "sql/resolver/ddl/ob_ddl_resolver.h"
#include "sql/resolver/expr/ob_raw_expr_util.h"
namespace oceanbase
{
using namespace common;
using namespace common::sqlclient;
using namespace obrpc;
using namespace share::schema;
using namespace sql;
namespace share
{

bool ObIndexBuilderUtil::is_do_create_dense_vec_index(const ObIndexType index_type)
{
  return share::schema::is_vec_index(index_type) && !share::schema::is_built_in_vec_spiv_index(index_type);
}

void ObIndexBuilderUtil::del_column_flags_and_default_value(ObColumnSchemaV2 &column)
{
  if ((column.is_generated_column() &&
       !column.is_fulltext_column() &&
       !column.is_vec_index_column() &&
       !column.is_spatial_generated_column() &&
       !column.is_multivalue_generated_column() &&
       !column.is_multivalue_generated_array_column() &&
       !column.is_vec_index_column())
      || column.is_identity_column()) {
    if (column.is_virtual_generated_column()) {
      column.del_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
    } else if (column.is_stored_generated_column()) {
      column.del_column_flag(STORED_GENERATED_COLUMN_FLAG);
    } else if (column.is_always_identity_column()) {
      column.del_column_flag(ALWAYS_IDENTITY_COLUMN_FLAG);
    } else if (column.is_default_identity_column()) {
      column.del_column_flag(DEFAULT_IDENTITY_COLUMN_FLAG);
    } else if (column.is_default_on_null_identity_column()) {
      column.del_column_flag(DEFAULT_ON_NULL_IDENTITY_COLUMN_FLAG);
    }
    ObObj obj;
    obj.set_null();
    column.set_cur_default_value(obj, column.is_default_expr_v2_column());
    column.set_orig_default_value(obj);
  }
  return;
}

int ObIndexBuilderUtil::add_column(
    const ObColumnSchemaV2 *data_column,
    const bool is_index,
    const bool is_rowkey,
    const ObOrderType order_type,
    ObRowDesc &row_desc,
    ObTableSchema &table_schema,
    const bool is_hidden,
    const bool is_specified_storing_col)
{
  int ret = OB_SUCCESS;
  if (NULL == data_column) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("data_column is null", KP(data_column), K(ret));
  } else if (OB_INVALID_INDEX != row_desc.get_idx(
        data_column->get_table_id(), data_column->get_column_id())) {
    if (is_index) {
      ret = OB_ERR_COLUMN_DUPLICATE;
      const ObString &column_name = data_column->get_column_name_str();
      LOG_USER_ERROR(OB_ERR_COLUMN_DUPLICATE, column_name.length(), column_name.ptr());
    } else if (is_specified_storing_col) {
      // is stroing column, and is also user specified.
      ObColumnSchemaV2 *store_column_schema = table_schema.get_column_schema(data_column->get_column_id());
      if (OB_ISNULL(store_column_schema)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("existed store column schema is null", K(ret), KPC(data_column), K(table_schema));
      } else {
        store_column_schema->add_column_flag(USER_SPECIFIED_STORING_COLUMN_FLAG);
      }
    }
  } else if (OB_FAIL(row_desc.add_column_desc(
          data_column->get_table_id(), data_column->get_column_id()))) {
    LOG_WARN("add_column_desc failed", "table_id", data_column->get_table_id(),
        "column_id", data_column->get_column_id(), K(ret));
  } else {
    ObColumnSchemaV2 column;
    if (OB_FAIL(column.assign(*data_column))) {
      LOG_WARN("fail to assign column", KR(ret), KPC(data_column));
    } else {
      column.set_table_id(OB_INVALID_ID);
      column.set_autoincrement(false);
      column.set_tbl_part_key_pos(0);
      column.set_prev_column_id(UINT64_MAX);
      column.set_next_column_id(UINT64_MAX);
      if (is_hidden) {
        column.set_is_hidden(is_hidden);
      }

      // Generated column of index table's value is generated by data table. Default value
      // (generated column expression) is not needed, and it introduce failure on global index
      // partition location lookup (refer columns not exist in index table).
      // Fulltext column's default value is needed, because we need to set GENERATED_CTXCAT_CASCADE_FLAG
      // spatial column's default value is needed, because we need to set SPATIAL_INDEX_GENERATED_COLUMN_FLAG
      // flag by parsing the default value.
      del_column_flags_and_default_value(column);
      if (is_rowkey) {
        column.set_rowkey_position(row_desc.get_column_num());
        column.set_order_in_rowkey(order_type);
      } else {
        column.set_rowkey_position(0);
      }

      if (is_index) {
        column.set_index_position(row_desc.get_column_num());
        column.set_order_in_rowkey(order_type);
      } else {
        column.set_index_position(0);
      }
      // user specified storing column.
      if (is_specified_storing_col) {
        column.add_column_flag(USER_SPECIFIED_STORING_COLUMN_FLAG);
      }
      //index column are not auto increment
      column.set_autoincrement(false);

      if (column.is_spatial_generated_column()) {
        column.set_geo_col_id(data_column->get_geo_col_id());
      }
      if (table_schema.is_fts_index() || (table_schema.is_multivalue_index()) || (table_schema.is_vec_spiv_index())) {
        ObObj default_value;
        column.del_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        if (column.is_word_segment_column()) {
          const int64_t data_length = MIN(column.get_data_length(), MIN(OB_MAX_ROW_KEY_LENGTH, OB_MAX_USER_ROW_KEY_LENGTH));
          column.set_data_length(data_length);
        }
        column.set_is_hidden(false);
        if (FAILEDx(column.set_orig_default_value(default_value))) {
          LOG_WARN("set orig default value failed", K(ret));
        } else if (OB_FAIL(column.set_cur_default_value(default_value, column.is_default_expr_v2_column()))) {
          LOG_WARN("set current default value failed", K(ret));
        }
      }
      if (column.is_vec_index_column()) {
        ObObj default_value;
        column.del_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        column.set_is_hidden(false);
        if (FAILEDx(column.set_orig_default_value(default_value))) {
          LOG_WARN("set orig default value failed", K(ret));
        } else if (OB_FAIL(column.set_cur_default_value(default_value, column.is_default_expr_v2_column()))) {
          LOG_WARN("set current default value failed", K(ret));
        }
      }
      if (OB_FAIL(ret)) {
      } else if (OB_FAIL(table_schema.add_column(column))) {
        LOG_WARN("add_column failed", K(column), K(ret));
      }
    }
  }
  return ret;
}
int ObIndexBuilderUtil::set_shadow_column_info(
    const ObString &src_column_name,
    const uint64_t src_column_id,
    ObColumnSchemaV2 &shadow_column_schema) {
  int ret = OB_SUCCESS;
  shadow_column_schema.set_nullable(true);
  //the shadow pk is an independent column
  //so it should not inherit the column flags of the original pk
  shadow_column_schema.set_column_flags(0);
  ObObj default_obj;
  default_obj.set_null();
  shadow_column_schema.set_cur_default_value(
      default_obj, shadow_column_schema.is_default_expr_v2_column());
  shadow_column_schema.set_orig_default_value(default_obj);
  shadow_column_schema.set_tbl_part_key_pos(0);
  shadow_column_schema.set_column_id(src_column_id);
  shadow_column_schema.set_is_hidden(true);
  if (OB_FAIL(shadow_column_schema.set_column_name(src_column_name))) {
    LOG_WARN("set_column_name failed", K(src_column_name), K(ret));
  }
  return ret;
}
int ObIndexBuilderUtil::add_shadow_pks(
    const ObTableSchema &data_schema,
    ObRowDesc &row_desc,
    ObTableSchema &schema,
    bool check_data_schema /*=true*/)
{
  int ret = OB_SUCCESS;
  if (check_data_schema && !data_schema.is_valid()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(data_schema), K(ret));
  } else {
    const bool is_index_column = false;
    const bool is_rowkey = true;
    const ObColumnSchemaV2 *const_data_column = NULL;
    ObColumnSchemaV2 data_column;
    const ObRowkeyInfo &rowkey_info = data_schema.get_rowkey_info();
    const ObPartitionKeyInfo &partition_keys = data_schema.get_partition_key_info();
    const ObPartitionKeyInfo &subpartition_keys = data_schema.get_subpartition_key_info();
    char shadow_pk_name[OB_MAX_COLUMN_NAME_BUF_LENGTH];
    ObSEArray<uint64_t, 2> column_ids;
    if (data_schema.is_table_without_pk() && schema.is_global_unique_index_table()) {
      if (partition_keys.is_valid() && OB_FAIL(partition_keys.get_column_ids(column_ids))) {
        LOG_WARN("fail to get column ids from partition keys", K(ret));
      } else if (subpartition_keys.is_valid() && OB_FAIL(subpartition_keys.get_column_ids(column_ids))) {
        LOG_WARN("fail to get column ids from subpartition keys", K(ret));
      }
    }
    if (OB_FAIL(ret)) {
    } else if (OB_FAIL(rowkey_info.get_column_ids(column_ids))) {
      LOG_WARN("fail to get column ids from rowkey info", K(ret));
    } else {
      for (int64_t i = 0; OB_SUCC(ret) && i < column_ids.count(); ++i) {
        uint64_t column_id = column_ids.at(i);
        int n = snprintf(shadow_pk_name, OB_MAX_COLUMN_NAME_BUF_LENGTH, "shadow_pk_%ld", i);
        if (n < 0 || n > OB_MAX_COLUMN_NAME_LENGTH) {
          ret = OB_BUF_NOT_ENOUGH;
          LOG_ERROR("failed to generate shadow_pk_name", K(i), K(n),
              "buffer size", OB_MAX_COLUMN_NAME_BUF_LENGTH, K(ret));
        } else if (NULL == (const_data_column = (data_schema.get_column_schema(column_id)))) {
          ret = OB_ERR_BAD_FIELD_ERROR;
          LOG_WARN("get_column_schema failed", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else if (const_data_column->is_key_forbid_lob()) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_WARN("Unexpected lob column in shadow pk", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else {
          if (OB_FAIL(data_column.assign(*const_data_column))) {
            LOG_WARN("fail to assign column", KR(ret), KPC(const_data_column));
          } else if (OB_FAIL(set_shadow_column_info(shadow_pk_name, common::OB_MIN_SHADOW_COLUMN_ID + const_data_column->get_column_id(), data_column))) {
            LOG_WARN("fail to set shadow_column_info", K(ret), K(data_column), K(shadow_pk_name));
          } else {
            // primary key(uk2, uk1)
            if (data_column.get_column_id() > schema.get_max_used_column_id()) {
              schema.set_max_used_column_id(data_column.get_column_id());
            }

            if (OB_FAIL(add_column(&data_column, is_index_column, is_rowkey,
                data_column.get_order_in_rowkey(), row_desc, schema, 
                false /* is_hidden */, false /* is_specified_storing_col */))) {
              LOG_WARN("add column failed", "data_column", data_column, K(is_index_column),
                  "order_in_rowkey", data_column.get_order_in_rowkey(),
                  K(row_desc), K(ret));
            }
          }
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::add_shadow_partition_keys(
  const ObTableSchema &data_schema,
  ObRowDesc &row_desc,
  ObTableSchema &schema)
{
  int ret = OB_SUCCESS;
  if (!data_schema.is_table_without_pk() && !data_schema.is_table_with_clustering_key()) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("only heap table or cluster by table should add shadow partition keys", K(data_schema), K(ret));
  } else {
    const bool is_index_column = false;
    const bool is_rowkey = !schema.is_unique_index();
    const ObPartitionKeyInfo &partition_keys = data_schema.get_partition_key_info();
    const ObPartitionKeyInfo &subpartition_keys = data_schema.get_subpartition_key_info();
    const ObColumnSchemaV2 *const_data_column = NULL;
    ObColumnSchemaV2 data_column;
    ObSEArray<uint64_t, 2> column_ids;
    if (data_schema.is_table_without_pk() || data_schema.is_table_with_clustering_key()) {
      if (partition_keys.is_valid() && OB_FAIL(partition_keys.get_column_ids(column_ids))) {
        LOG_WARN("fail to get column ids from partition keys", K(ret));
      } else if (subpartition_keys.is_valid() && OB_FAIL(subpartition_keys.get_column_ids(column_ids))) {
        LOG_WARN("fail to get column ids from subpartition keys", K(ret));
      }
    }
    if (OB_SUCC(ret)) {
      for (int64_t i = 0; OB_SUCC(ret) && i < column_ids.count(); ++i) {
        uint64_t column_id = column_ids.at(i);
        if (NULL == (const_data_column = (data_schema.get_column_schema(column_id)))) {
          ret = OB_ERR_BAD_FIELD_ERROR;
          LOG_WARN("get_column_schema failed", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else if (const_data_column->is_key_forbid_lob()) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_WARN("Unexpected lob column in shadow partition key", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else if (ob_is_roaringbitmap_tc(const_data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_WARN("Unexpected roaringbitmap column in shadow partition key", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else if (ob_is_extend(const_data_column->get_data_type())
                   || ob_is_user_defined_sql_type(const_data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_WARN("Unexpected udt column in shadow partition key", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else if (ob_is_json_tc(const_data_column->get_data_type())) {
          ret = OB_ERR_JSON_USED_AS_KEY;
          LOG_WARN("Unexpected json column in shadow pk", "table_id", data_schema.get_table_id(),
              K(column_id), K(ret));
        } else {
          if (OB_FAIL(data_column.assign(*const_data_column))) {
            LOG_WARN("fail to assign column", KR(ret), KPC(const_data_column));
          } else if (data_column.get_column_id() > schema.get_max_used_column_id()) {
            schema.set_max_used_column_id(data_column.get_column_id());
          }

          if (FAILEDx(add_column(&data_column, is_index_column, is_rowkey,
              data_column.get_order_in_rowkey(), row_desc, schema, 
              false /* is_hidden */, false /* is_specified_storing_col */))) {
            LOG_WARN("add column failed", "data_column", data_column, K(is_index_column),
                "order_in_rowkey", data_column.get_order_in_rowkey(),
                K(row_desc), K(ret));
          } else if (data_column.is_generated_column()) {
            ObSEArray<uint64_t, 5> cascaded_columns;
            if (OB_FAIL(data_column.get_cascaded_column_ids(cascaded_columns))) {
              LOG_WARN("failed to get cascaded_column_ids", K(data_column));
            }
            // need to add cascaded columns in global index table for the partition key
            for (int64_t i = 0; OB_SUCC(ret) && i < cascaded_columns.count(); ++i) {
              uint64_t column_id = cascaded_columns.at(i);
              const ObColumnSchemaV2 *cascaded_col_schema = data_schema.get_column_schema(column_id);
              if (OB_ISNULL(cascaded_col_schema)) {
                ret = OB_ERR_UNEXPECTED;
                LOG_WARN("failed to get column", K(data_schema), K(column_id), K(ret));
              } else if (OB_INVALID_INDEX == row_desc.get_idx(
                        cascaded_col_schema->get_table_id(), cascaded_col_schema->get_column_id())) {
                if (cascaded_col_schema->get_column_id() > schema.get_max_used_column_id()) {
                    schema.set_max_used_column_id(cascaded_col_schema->get_column_id());
                }
                if (OB_FAIL(add_column(cascaded_col_schema, is_index_column, is_rowkey,
                    cascaded_col_schema->get_order_in_rowkey(), row_desc, schema,
                    false /* is_hidden */, false /* is_specified_storing_col */))) {
                  LOG_WARN("add column failed", "column", *cascaded_col_schema, K(is_index_column),
                      "order_in_rowkey", cascaded_col_schema->get_order_in_rowkey(), K(row_desc), K(ret));
                }
              }
            }
          }
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::set_index_table_columns(
    const ObCreateIndexArg &arg,
    const ObTableSchema &data_schema,
    ObTableSchema &index_schema,
    bool check_data_schema /*=true*/)
{
  int ret = OB_SUCCESS;
  bool use_mysql_errno = true; //use mysql errno for functional index
  if (check_data_schema && !data_schema.is_valid()) {
    // some items in arg may be invalid, don't check arg
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(data_schema), K(ret));
  } else if (data_schema.is_valid()) {
    bool is_oracle_mode = false;
    if (OB_FAIL(data_schema.check_if_oracle_compat_mode(is_oracle_mode))) {
      LOG_WARN("check_if_oracle_compat_mode failed", K(ret));
    } else {
      use_mysql_errno = !is_oracle_mode;
    }
  }
  // no matter what index col of data table is, columns of 4 aux fts table is fixed
  if (OB_FAIL(ret)) {
  } else if (is_vec_index(arg.index_type_)) {
    if (OB_FAIL(ObVecIndexBuilderUtil::set_vec_aux_table_columns(arg, data_schema, index_schema))) {
      LOG_WARN("failed to set vec aux table columns", K(ret));
    }
  } else if (is_fts_index(arg.index_type_) ||
             is_multivalue_index(arg.index_type_)) {
    if (is_doc_rowkey_aux(arg.index_type_)) {
      if (OB_FAIL(ObFtsIndexBuilderUtil::set_fts_doc_rowkey_table_columns(arg,
                                                                          data_schema,
                                                                          index_schema))) {
        LOG_WARN("failed to set fts doc rowkey table", K(ret));
      }
    } else if (is_rowkey_doc_aux(arg.index_type_)) {
      if (OB_FAIL(ObFtsIndexBuilderUtil::set_fts_rowkey_doc_table_columns(arg,
                                                                          data_schema,
                                                                          index_schema))) {
        LOG_WARN("failed to set fts rowkey doc table", K(ret));
      }
    } else if (is_fts_index_aux(arg.index_type_)) {
      if (OB_FAIL(ObFtsIndexBuilderUtil::set_fts_index_table_columns(arg,
                                                                     data_schema,
                                                                     index_schema))) {
        LOG_WARN("failed to set fts index table", K(ret));
      }
    } else if (is_fts_doc_word_aux(arg.index_type_)) {
      if (OB_FAIL(ObFtsIndexBuilderUtil::set_fts_index_table_columns(arg,
                                                                     data_schema,
                                                                     index_schema))) {
      LOG_WARN("failed to set fts doc word table", K(ret));
      }
    } else if (is_multivalue_index(arg.index_type_)) {
      if (OB_FAIL(ObMulValueIndexBuilderUtil::set_multivalue_index_table_columns(arg,
                                                                                 data_schema,
                                                                                 index_schema))) {
        LOG_WARN("failed to set multivalue index table", K(ret));
      }
    } else {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("fts arg index type not expected", K(ret));
    }
  } else { // not fts index or multivalue index
    HEAP_VAR(ObRowDesc, row_desc) {
      bool is_index_column = false;
      // index columns
      // attention: must add index column first
      for (int64_t i = 0; OB_SUCC(ret) && i < arg.index_columns_.count(); ++i) {
        const ObColumnSchemaV2 *data_column = NULL;
        const bool is_rowkey = true;
        is_index_column = true;
        const ObColumnSortItem &sort_item = arg.index_columns_.at(i);
        if (NULL == (data_column = data_schema.get_column_schema(sort_item.column_name_))) {
          ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
          LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS,
                         sort_item.column_name_.length(),
                         sort_item.column_name_.ptr());
          LOG_WARN("get_column_schema failed", "tenant_id", data_schema.get_tenant_id(),
                   "database_id", data_schema.get_database_id(),
                   "table_name", data_schema.get_table_name(),
                   "column name", sort_item.column_name_, K(ret));
        } else if (data_column->is_key_forbid_lob()) {
          if (use_mysql_errno && data_column->is_func_idx_column()) {
            ret = OB_ERR_FUNCTIONAL_INDEX_ON_LOB;
            LOG_WARN("Cannot create a functional index on an expression that returns a BLOB or TEXT.", K(ret));
          } else {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, sort_item.column_name_.length(), sort_item.column_name_.ptr());
            LOG_WARN("Index column should not be lob type", "tenant_id", data_schema.get_tenant_id(),
                    "database_id", data_schema.get_database_id(),
                    "table_name", data_schema.get_table_name(),
                    "column name", sort_item.column_name_,
                    "column length", sort_item.prefix_len_, K(ret));
          }
        } else if (ob_is_roaringbitmap_tc(data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, sort_item.column_name_.length(), sort_item.column_name_.ptr());
          LOG_WARN("roaringbitmap column cannot be used in key specification", "tenant_id", data_schema.get_tenant_id(),
                  "database_id", data_schema.get_database_id(),
                  "table_name", data_schema.get_table_name(),
                  "column name", sort_item.column_name_,
                  "column length", sort_item.prefix_len_, K(ret));
        } else if (data_column->is_xmltype()) {
          ret = OB_ERR_XML_INDEX;
          LOG_USER_ERROR(OB_ERR_XML_INDEX, sort_item.column_name_.length(), sort_item.column_name_.ptr());
          LOG_WARN("Index column should not be udt type", "tenant_id", data_schema.get_tenant_id(),
                   "database_id", data_schema.get_database_id(),
                   "table_name", data_schema.get_table_name(),
                   "column name", sort_item.column_name_,
                   "column length", sort_item.prefix_len_, K(ret));
        } else if (ob_is_extend(data_column->get_data_type())
                     || ob_is_user_defined_sql_type(data_column->get_data_type())) {
          ret = OB_ERR_WRONG_KEY_COLUMN;
          LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, sort_item.column_name_.length(), sort_item.column_name_.ptr());
          LOG_WARN("Index column should not be udt type", "tenant_id", data_schema.get_tenant_id(),
                   "database_id", data_schema.get_database_id(),
                   "table_name", data_schema.get_table_name(),
                   "column name", sort_item.column_name_,
                   "column length", sort_item.prefix_len_, K(ret));
        } else if (ob_is_json_tc(data_column->get_data_type())) {
          if (use_mysql_errno && data_column->is_func_idx_column()) {
            ret = OB_ERR_FUNCTIONAL_INDEX_ON_JSON_OR_GEOMETRY_FUNCTION;
            LOG_WARN("Cannot create a functional index on an expression that returns a JSON or GEOMETRY.",K(ret));
          } else {
            ret = OB_ERR_JSON_USED_AS_KEY;
            LOG_USER_ERROR(OB_ERR_JSON_USED_AS_KEY, sort_item.column_name_.length(), sort_item.column_name_.ptr());
            LOG_WARN("JSON column cannot be used in key specification", "tenant_id", data_schema.get_tenant_id(),
                    "database_id", data_schema.get_database_id(),
                    "table_name", data_schema.get_table_name(),
                    "column name", sort_item.column_name_,
                    "column length", sort_item.prefix_len_, K(ret));
          }
        } else if (OB_FAIL(add_column(data_column, is_index_column, is_rowkey,
            arg.index_columns_.at(i).order_type_, row_desc, index_schema,
            false /* is_hidden */, false /* is_specified_storing_col */))) {
            LOG_WARN("add column failed", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), "rowkey_order_type", arg.index_columns_.at(i).order_type_,
                K(row_desc), K(ret));
        }
      }
      if (OB_SUCC(ret)) {
        index_schema.set_index_column_num(row_desc.get_column_num());
        index_schema.set_rowkey_column_num(row_desc.get_column_num());
      }

      // - if not unique index, add data table's rowkey to index table's rowkey
      // - if unique index, add data schema's rowkey to redundant storing columns
      // For unique index, another hidden column would append to rowkey, which
      // before data schema's rowkey.

      // add hidden ghost_pk columns for unique index
      const bool is_unique = index_schema.is_unique_index();
      if (OB_SUCC(ret) && is_unique) {
        if (OB_FAIL(add_shadow_pks(data_schema, row_desc, index_schema, check_data_schema))) {
          LOG_WARN("add_shadow_pks failed", K(data_schema), K(row_desc), K(ret));
        }
      }

      if (OB_SUCC(ret)) {
        is_index_column = false;
        const ObColumnSchemaV2 *data_column = NULL;
        const ObRowkeyInfo &rowkey_info = data_schema.get_rowkey_info();
        const ObColumnSchemaV2 *output_column = nullptr;
        uint64_t tenant_data_version = 0;
        for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_info.get_size(); ++i) {
          uint64_t column_id = OB_INVALID_ID;
          const bool is_rowkey = !index_schema.is_unique_index();
          ObColumnSchemaV2 modified_column;
          if (OB_FAIL(rowkey_info.get_column_id(i, column_id))) {
            LOG_WARN("get_column_id failed", "index", i, K(ret));
          } else if (NULL == (data_column = data_schema.get_column_schema(column_id))) {
            ret = OB_ERR_BAD_FIELD_ERROR;
            LOG_WARN("get_column_schema failed", "table_id", data_schema.get_table_id(),
                K(column_id), K(ret));
          } else if (data_column->is_key_forbid_lob()) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_WARN("Lob column should not appear in rowkey position", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), "order_in_rowkey", data_column->get_order_in_rowkey(),
                K(row_desc), K(ret));
          } else if (ob_is_roaringbitmap_tc(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_WARN("roaringbitmap column should not appear in rowkey position", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), "order_in_rowkey", data_column->get_order_in_rowkey(),
                K(row_desc), K(ret));
          } else if (ob_is_extend(data_column->get_data_type()) || ob_is_user_defined_sql_type(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_WARN("udt column should not appear in rowkey position", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), "order_in_rowkey", data_column->get_order_in_rowkey(),
                K(row_desc), K(ret));
          } else if (ob_is_json_tc(data_column->get_data_type())) {
            ret = OB_ERR_JSON_USED_AS_KEY;
            LOG_WARN("JSON column cannot be used in key specification.", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), "order_in_rowkey", data_column->get_order_in_rowkey(),
                K(row_desc), K(ret));
          } else if (FALSE_IT(output_column = data_column)) {
          } else if (OB_FAIL(GET_MIN_DATA_VERSION(data_schema.get_tenant_id(), tenant_data_version))) {
            LOG_WARN("get tenant data version failed", K(ret), K(data_schema.get_tenant_id()));
          } else if (tenant_data_version >= DATA_VERSION_4_5_0_0 && data_schema.is_table_without_pk()
                     && share::schema::is_index_local_storage(arg.index_type_)
                     && OB_FAIL(add_skip_index_for_hidden_pk(*data_column, modified_column, output_column))) {
            LOG_WARN("add skip index for auto inc pk failed", K(ret));
          } else if (OB_FAIL(add_column(output_column, is_index_column, is_rowkey,
              data_column->get_order_in_rowkey(), row_desc, index_schema,
              false /* is_hidden */, false /* is_specified_storing_col */))) {
            LOG_WARN("add column failed", K(ret));
          }
        }
      }

      // if not unique index, update rowkey_column_num
      if (OB_SUCCESS == ret && !index_schema.is_unique_index()) {
        index_schema.set_rowkey_column_num(row_desc.get_column_num());
      }

      // if data table is a heap table or a cluster by table, add partition keys to index table
      if (OB_SUCC(ret) && (data_schema.is_table_without_pk() || data_schema.is_table_with_clustering_key())
       && index_schema.is_global_index_table()) {
        if (OB_FAIL(add_shadow_partition_keys(data_schema, row_desc, index_schema))) {
          LOG_WARN("add_shadow_partition_keys failed", K(data_schema), K(row_desc), K(ret));
        }
      }

      // add redundant storing columns
      if (OB_SUCC(ret)) {
        for (int64_t i = 0; OB_SUCC(ret) && i < arg.store_columns_.count(); ++i) {
          const ObColumnSchemaV2 *data_column = NULL;
          const bool is_rowkey = false;
          const bool is_specified_storing_col = true;
          // is_rowkey is false, order_in_rowkey will not be used
          const ObOrderType order_in_rowkey = ObOrderType::DESC;
          if (NULL == (data_column = data_schema.get_column_schema(arg.store_columns_.at(i)))) {
            ret = OB_ERR_BAD_FIELD_ERROR;
            LOG_WARN("get_column_schema failed", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (data_column->is_key_forbid_lob()) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be lob type", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (ob_is_roaringbitmap_tc(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be roaringbitmap type", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (ob_is_extend(data_column->get_data_type()) || ob_is_user_defined_sql_type(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be udt type", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (ob_is_json_tc(data_column->get_data_type())) {
            ret = OB_ERR_JSON_USED_AS_KEY;
            LOG_USER_ERROR(OB_ERR_JSON_USED_AS_KEY, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
            LOG_WARN("JSON column cannot be used in key specification.", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (ob_is_collection_sql_type(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be collection type", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (ob_is_geometry_tc(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.store_columns_.at(i).length(), arg.store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be geometry type", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.store_columns_.at(i), K(ret));
          } else if (OB_FAIL(add_column(data_column, is_index_column, is_rowkey,
              order_in_rowkey, row_desc, index_schema, false /* is_hidden */, is_specified_storing_col))) {
            LOG_WARN("add_column failed", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), K(order_in_rowkey), K(row_desc), K(ret));
          }
        }
      }

      // add redundant hidden storing columns
      if (OB_SUCC(ret)) {
        for (int64_t i = 0; OB_SUCC(ret) && i < arg.hidden_store_columns_.count(); ++i) {
          const ObColumnSchemaV2 *data_column = NULL;
          const bool is_rowkey = false;
          // is_rowkey is false, order_in_rowkey will not be used
          const ObOrderType order_in_rowkey = ObOrderType::DESC;
          if (OB_ISNULL(data_column = data_schema.get_column_schema(arg.hidden_store_columns_.at(i)))) {
            ret = OB_ERR_BAD_FIELD_ERROR;
            LOG_WARN("get_column_schema failed", "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (data_column->is_key_forbid_lob()) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.hidden_store_columns_.at(i).length(),
                                                    arg.hidden_store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be lob type",
                "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (ob_is_roaringbitmap_tc(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.hidden_store_columns_.at(i).length(),
                                                    arg.hidden_store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be roaringbitmap type",
                "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (ob_is_extend(data_column->get_data_type())
                     || ob_is_user_defined_sql_type(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.hidden_store_columns_.at(i).length(),
                                                    arg.hidden_store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be udt type",
                "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (ob_is_json_tc(data_column->get_data_type())) {
            ret = OB_ERR_JSON_USED_AS_KEY;
            LOG_USER_ERROR(OB_ERR_JSON_USED_AS_KEY, arg.hidden_store_columns_.at(i).length(),
                                                    arg.hidden_store_columns_.at(i).ptr());
            LOG_WARN("JSON column '%.*s' cannot be used in key specification.",
                "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (ob_is_collection_sql_type(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.hidden_store_columns_.at(i).length(),
                                                    arg.hidden_store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be collection type",
                "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (ob_is_geometry_tc(data_column->get_data_type())) {
            ret = OB_ERR_WRONG_KEY_COLUMN;
            LOG_USER_ERROR(OB_ERR_WRONG_KEY_COLUMN, arg.hidden_store_columns_.at(i).length(),
                                                    arg.hidden_store_columns_.at(i).ptr());
            LOG_WARN("Index storing column should not be geometry type",
                "tenant_id", data_schema.get_tenant_id(),
                "database_id", data_schema.get_database_id(), "table_name",
                data_schema.get_table_name(), "column name", arg.hidden_store_columns_.at(i), K(ret));
          } else if (OB_FAIL(add_column(data_column, is_index_column, is_rowkey,
                                        order_in_rowkey, row_desc, index_schema, 
                                        true /* is_hidden */, false /* is_specified_storing_col */))) {
            LOG_WARN("add_column failed", "data_column", *data_column, K(is_index_column),
                K(is_rowkey), K(order_in_rowkey), K(row_desc), K(ret));
          }
        }
      }
      if (OB_SUCC(ret)) {
        if (OB_FAIL(index_schema.sort_column_array_by_column_id())) {
          LOG_WARN("failed to sort column", K(ret));
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::add_skip_index_for_hidden_pk(const ObColumnSchemaV2 &original_column,
                                                     ObColumnSchemaV2 &modified_column,
                                                     const ObColumnSchemaV2* &output_column)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(!ObColumnSchemaV2::is_hidden_pk_column_id(original_column.get_column_id()))) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("column is not hidden pk column", K(original_column), K(ret));
  } else if (OB_UNLIKELY(!original_column.is_rowkey_column())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("column is not primary key column", K(original_column), K(ret));
  } else if (OB_FAIL(modified_column.assign(original_column))) {
    LOG_WARN("assign column schema failed", K(ret));
  } else {
    ObSkipIndexColumnAttr skip_index_attr = modified_column.get_skip_index_attr();
    skip_index_attr.set_min_max();
    modified_column.set_skip_index_attr(skip_index_attr.get_packed_value());
    output_column = &modified_column;
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_expr_index_args(
    ObCreateIndexArg &arg,
    ObTableSchema &data_schema,
    ObIAllocator &allocator,
    ObIArray<ObColumnSchemaV2*> &gen_columns)
{
  int ret = OB_SUCCESS;
  if (arg.fulltext_columns_.count() > 0) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("fulltext_columns_ is deprecated!", K(ret));
  } else if (ObSimpleTableSchemaV2::is_spatial_index(arg.index_type_)) {
    ObSEArray<ObColumnSchemaV2*, 2>  spatial_cols;
    uint64_t tenant_id = data_schema.get_tenant_id();
    uint64_t tenant_data_version = 0;
    if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, tenant_data_version))) {
      LOG_WARN("get tenant data version failed", K(ret));
    } else if (tenant_data_version < DATA_VERSION_4_1_0_0) {
      ret = OB_NOT_SUPPORTED;
      LOG_WARN("tenant version is less than 4.1, spatial index not supported",
          K(ret), K(tenant_data_version));
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "tenant version is less than 4.1, spatial index");
    } else if (OB_FAIL(adjust_spatial_args(arg, data_schema, allocator, spatial_cols))) {
      LOG_WARN("adjust spatial args failed", K(ret));
    } else if (OB_FAIL(gen_columns.push_back(spatial_cols.at(0)))) {
      LOG_WARN("push back cellid column to gen columns failed", K(ret));
    } else if (OB_FAIL(gen_columns.push_back(spatial_cols.at(1)))) {
      LOG_WARN("push back mbr column to gen columns failed", K(ret));
    }
  } else if (ObIndexBuilderUtil::is_do_create_dense_vec_index(arg.index_type_) || is_vec_spiv_index_aux(arg.index_type_)) {
    if (OB_FAIL(ObVecIndexBuilderUtil::check_vec_index_allowed(arg.index_type_, data_schema))) {
      LOG_WARN("fail to check vector index allowed", K(ret));
    } else if (OB_FAIL(ObVecIndexBuilderUtil::adjust_vec_args(arg, data_schema, allocator, gen_columns))) {
      LOG_WARN("failed to adjust vec index args", K(ret), K(arg.index_type_));
    }
  } else if (is_fts_index(arg.index_type_)) {
    if (OB_FAIL(ObFtsIndexBuilderUtil::adjust_fts_args(arg, data_schema, allocator, gen_columns))) {
      LOG_WARN("failed to adjust fts args", K(ret));
    }
  } else if (is_multivalue_index(arg.index_type_)) {
    if (OB_FAIL(ObMulValueIndexBuilderUtil::adjust_mulvalue_index_args(arg, data_schema, allocator, gen_columns, true))) {
      LOG_WARN("failed to adjust multivalue args", K(ret));
    }
  } else if (OB_FAIL(adjust_ordinary_index_column_args(arg, data_schema, allocator, gen_columns))) {
    LOG_WARN("adjust ordinary index column args failed", K(ret));
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_ordinary_index_column_args(
    ObCreateIndexArg &arg,
    ObTableSchema &data_schema,
    ObIAllocator &allocator,
    ObIArray<ObColumnSchemaV2*> &gen_columns)
{
  int ret = OB_SUCCESS;
  ObIArray<ObColumnSortItem> &sort_items = arg.index_columns_;
  ObArray<ObColumnSortItem> new_sort_items;
  lib::Worker::CompatMode compat_mode = lib::Worker::CompatMode::MYSQL;
  uint64_t tenant_id = data_schema.get_tenant_id();
  ObCompatModeGetter::get_tenant_mode(tenant_id, compat_mode);
  lib::CompatModeGuard compat_guard(compat_mode);
  for (int64_t i = 0; OB_SUCC(ret) && i < sort_items.count(); ++i) {
    int64_t old_cnt = data_schema.get_column_count();
    ObColumnSortItem new_sort_item = sort_items.at(i);
    ObColumnSchemaV2 *gen_col = NULL;
    if (!new_sort_item.is_func_index_ && new_sort_item.prefix_len_ <= 0) {
      //非函数索引情况下, 走此路径.
      const ObColumnSchemaV2 *col_schema = data_schema.get_column_schema(new_sort_item.column_name_);
      if (OB_ISNULL(col_schema)) {
        ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
        LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS,
                      new_sort_item.column_name_.length(),
                      new_sort_item.column_name_.ptr());
      }
    } else {
      ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
      ObRawExprFactory expr_factory(allocator);
      SMART_VAR(sql::ObSQLSessionInfo, session) {
        SMART_VARS_2((sql::ObExecContext, exec_ctx, allocator), (ObPhysicalPlanCtx, phy_plan_ctx, allocator)) {
          uint64_t tenant_id = data_schema.get_tenant_id();
          const ObTenantSchema *tenant_schema = NULL;
          ObSchemaGetterGuard guard;
          ObSchemaChecker schema_checker;
          ObRawExpr *expr = NULL;
          LinkExecCtxGuard link_guard(session, exec_ctx);
          ObObj sql_mode_obj;
          exec_ctx.set_my_session(&session);
          exec_ctx.set_is_ps_prepare_stage(false);
          exec_ctx.set_physical_plan_ctx(&phy_plan_ctx);
          sql_mode_obj.set_uint64(arg.sql_mode_);
          if (OB_FAIL(session.init(0 /*default session id*/,
                                  0 /*default proxy id*/,
                                  &allocator))) {
            LOG_WARN("init session failed", K(ret));
          /* ex: create index H_IDX1 ON EMP3(sum_sal(sal,comm)); */
          /* sum_sal is udf, need default schema to resolve */
          /* when has schema prefix, create index H_IDX1 ON EMP3(test.sum_sal(sal,comm)); */
          /* pl resolver will use prefix schema and ignore the default database name */
          } else if (OB_FAIL(session.set_default_database(arg.database_name_))) {
            LOG_WARN("failed to set default session default database name", K(ret));
          } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard))) {
            LOG_WARN("get schema guard failed", K(ret));
          } else if (OB_FAIL(schema_checker.init(guard))) {
            LOG_WARN("failed to init schema checker", K(ret));
          } else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant_schema))) {
            LOG_WARN("get tenant_schema failed", K(ret));
          } else if (OB_FAIL(session.init_tenant(tenant_schema->get_tenant_name_str(), tenant_id))) {
            LOG_WARN("init tenant failed", K(ret));
          } else if (OB_FAIL(session.load_all_sys_vars(guard))) {
            LOG_WARN("session load system variable failed", K(ret));
          } else if (OB_FAIL(session.load_default_configs_in_pc())) {
            LOG_WARN("session load default configs failed", K(ret));
          } else if (OB_FAIL(arg.local_session_var_.update_session_vars_with_local(session))) {
            LOG_WARN("fail to update session vars", K(ret));
          } else if (OB_FAIL(session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode_obj))) {
            LOG_WARN("fail to update sql mode", K(ret));
          } else if (new_sort_item.prefix_len_ > 0) {
            //handle prefix column index
            if (OB_FAIL(generate_prefix_column(new_sort_item, arg.sql_mode_, session, data_schema, gen_col))) {
              LOG_WARN("generate prefix column failed", K(ret));
            } else {
              new_sort_item.column_name_ = gen_col->get_column_name_str();
              new_sort_item.prefix_len_ = 0;
            }
          } else {
            //parse ordinary index expr as the real expr(maybe column expression)
            const ObString &index_expr_def = new_sort_item.column_name_;
            if (OB_FAIL(ObRawExprUtils::build_generated_column_expr(&arg,
                                                          index_expr_def,
                                                          expr_factory,
                                                          session,
                                                          data_schema,
                                                          expr,
                                                          &schema_checker,
                                                          ObResolverUtils::CHECK_FOR_FUNCTION_INDEX,
                                                          NULL))) {
              LOG_WARN("build generated column expr failed", K(ret));
            } else if (!expr->is_column_ref_expr()) {
              //real index expr, so generate hidden generated column in data table schema
              if (lib::Worker::CompatMode::MYSQL == compat_mode) {
                if (ob_is_geometry(expr->get_result_type().get_type())) {
                  ret = OB_ERR_SPATIAL_FUNCTIONAL_INDEX;
                  LOG_WARN("Spatial functional index is not supported.", K(ret));
                } else if (ob_is_json_tc(expr->get_result_type().get_type())) {
                  ret = OB_ERR_FUNCTIONAL_INDEX_ON_JSON_OR_GEOMETRY_FUNCTION;
                  LOG_WARN("Cannot create a functional index on an expression that returns a JSON or GEOMETRY.",K(ret));
                } else if (ob_is_collection_sql_type(expr->get_result_type().get_type())) {
                  ret = OB_ERR_FUNCTIONAL_INDEX_ON_FIELD;
                  LOG_WARN("Cannot create a functional index on an expression that returns a ARRAY.",K(ret));
                } else if (ob_is_text_tc(expr->get_result_type().get_type()) || ob_is_lob_tc(expr->get_result_type().get_type())) {
                  ret = OB_ERR_FUNCTIONAL_INDEX_ON_LOB;
                  LOG_WARN("Cannot create a functional index on an expression that returns a BLOB or TEXT.", K(ret));
                }
              }
              if (OB_FAIL(ret)) {
              } else if (OB_ISNULL(GCTX.schema_service_)) {
                ret = OB_ERR_UNEXPECTED;
                LOG_WARN("unexpected null", K(ret));
              } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(data_schema.get_tenant_id(), guard))) {
                LOG_WARN("get schema guard failed", K(ret));
              } else if (OB_FAIL(generate_ordinary_generated_column(
                  *expr, session, data_schema, gen_col, &guard))) {
                LOG_WARN("generate ordinary generated column failed", K(ret));
              } else if (OB_FAIL(ObRawExprUtils::check_generated_column_expr_str(
                  gen_col->get_cur_default_value().get_string(), session, data_schema))) {
                LOG_WARN("fail to check printed generated column expr", K(ret));
              } else {
                new_sort_item.column_name_ = gen_col->get_column_name_str();
                new_sort_item.is_func_index_ = false;
              }
              if (OB_SUCC(ret) && lib::Worker::CompatMode::MYSQL == compat_mode) {
                ObSEArray<ObRawExpr*, 4> dep_columns;
                if (OB_FAIL(ObRawExprUtils::extract_column_exprs(expr, dep_columns))) {
                  LOG_WARN("extract column exprs failed", K(ret), K(expr));
                } else {
                  for (int64_t j = 0; OB_SUCC(ret) && j < dep_columns.count(); ++j) {
                    const ObRawExpr *dep_column = dep_columns.at(j);
                    if (OB_ISNULL(dep_column)) {
                      ret = OB_ERR_UNEXPECTED;
                      LOG_WARN("deps_column is null");
                    } else if (!dep_column->is_column_ref_expr()) {
                      ret = OB_ERR_UNEXPECTED;
                      LOG_WARN("dep column is invalid", K(ret), KPC(dep_column));
                    } else if (dep_column->is_auto_increment()) {
                      ret = OB_ERR_FUNCTIONAL_INDEX_REF_AUTO_INCREMENT;
                      LOG_USER_ERROR(OB_ERR_FUNCTIONAL_INDEX_REF_AUTO_INCREMENT, arg.index_name_.length(), arg.index_name_.ptr());
                      LOG_WARN("Functional index cannot refer to an auto-increment column.", K(ret));
                    }
                  }
                }
              }
            } else if (lib::Worker::CompatMode::ORACLE == compat_mode) {
              const ObColumnRefRawExpr *ref_expr = static_cast<const ObColumnRefRawExpr*>(expr);
              new_sort_item.column_name_ = ref_expr->get_column_name();
              new_sort_item.is_func_index_ = false;
            } else {
              ret = OB_ERR_FUNCTIONAL_INDEX_ON_FIELD;
              LOG_WARN("Functional index on a column is not supported.", K(ret), K(*expr));
            }
          }
          exec_ctx.set_physical_plan_ctx(NULL);
        }
      }
    }
    if (OB_SUCC(ret)) {
      ObString tmp_name = new_sort_item.column_name_;
      //Keep the memory lifetime of column_name consistent with index_arg
      if (OB_FAIL(ob_write_string(allocator, tmp_name, new_sort_item.column_name_))) {
        LOG_WARN("deep copy column name failed", K(ret));
      } else if (OB_FAIL(new_sort_items.push_back(new_sort_item))) {
        LOG_WARN("store new sort item failed", K(ret), K(new_sort_item));
      } else if (data_schema.get_column_count() > old_cnt) {
        //新生成了新的生成列，需要保存新的生成列
        LOG_INFO("column info", KPC(gen_col), K(old_cnt), K(data_schema.get_column_count()));
        if (OB_FAIL(gen_columns.push_back(gen_col))) {
          LOG_WARN("store generated column failed", K(ret));
        }
      }
    }
  }
  if (OB_SUCC(ret)) {
    sort_items.reset();
    if (OB_FAIL(sort_items.assign(new_sort_items))) {
      LOG_WARN("assign sort items failed", K(ret));
    }
  }
  return ret;
}

int ObIndexBuilderUtil::generate_ordinary_generated_column(
    ObRawExpr &expr,
    const sql::ObSQLSessionInfo &session,
    ObTableSchema &data_schema,
    ObColumnSchemaV2 *&gen_col,
    ObSchemaGetterGuard *schema_guard,
    const uint64_t index_id)
{
  int ret = OB_SUCCESS;
  ObColumnSchemaV2 tmp_gen_col;
  uint64_t tenant_data_version = 0;
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], expr_def_buf) {
    MEMSET(expr_def_buf, 0, sizeof(expr_def_buf));
    int64_t pos = 0;
    ObRawExprPrinter expr_printer(expr_def_buf, OB_MAX_DEFAULT_VALUE_LENGTH, &pos, schema_guard);
    const bool is_invalid = (index_id < OB_APP_MIN_COLUMN_ID || index_id > OB_MIN_SHADOW_COLUMN_ID);
    if (OB_FAIL(expr_printer.do_print(&expr, T_NONE_SCOPE, true))) {
      LOG_WARN("print expr definition failed", K(ret));
    }
    if (OB_SUCC(ret)) {
      if (OB_FAIL(GET_MIN_DATA_VERSION(data_schema.get_tenant_id(), tenant_data_version))) {
        LOG_WARN("get tenant data version failed", K(ret));
      } else if (tenant_data_version < DATA_VERSION_4_2_2_0) {
        //do nothing
      } else if (OB_FAIL(ObRawExprUtils::extract_local_vars_for_gencol(&expr, &session, tmp_gen_col))) {
        LOG_WARN("fail to extract sysvar from expr", K(ret));
      }
    }
    if (OB_SUCC(ret)) {
      // add check
      ObString expr_def(pos, expr_def_buf);
      ObColumnSchemaV2 *old_gen_col = NULL;
      if (OB_FAIL(data_schema.get_generated_column_by_define(expr_def,
                                                             true/*only hidden column*/,
                                                             old_gen_col))) {
        LOG_WARN("get generated column by define failed", K(ret), K(expr_def));
      } else if (old_gen_col != NULL
                && tmp_gen_col.get_local_session_var() == old_gen_col->get_local_session_var()) {
        //got it
        gen_col = old_gen_col;
      } else {
        //need to add new generated column
        ObObj default_value;
        char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
        pos = 0;
        default_value.set_varchar(expr_def);
        tmp_gen_col.set_rowkey_position(0); //非主键列
        tmp_gen_col.set_index_position(0); //非索引列
        tmp_gen_col.set_tbl_part_key_pos(0); //非partition key
        tmp_gen_col.set_tenant_id(data_schema.get_tenant_id());
        tmp_gen_col.set_table_id(data_schema.get_table_id());
        tmp_gen_col.set_column_id(data_schema.get_max_used_column_id() + 1);
        tmp_gen_col.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        if (is_pad_char_to_full_length(session.get_sql_mode())) {
          tmp_gen_col.add_column_flag(PAD_WHEN_CALC_GENERATED_COLUMN_FLAG);
        }
        tmp_gen_col.set_is_hidden(true);
        if (expr.get_result_type().is_null()) {
          const ObAccuracy varchar_accuracy(0);
          tmp_gen_col.set_data_type(ObVarcharType);
          tmp_gen_col.set_collation_type(data_schema.get_collation_type());
          tmp_gen_col.set_accuracy(varchar_accuracy);
        } else {
          tmp_gen_col.set_data_type(expr.get_data_type());
          tmp_gen_col.set_collation_type(expr.get_collation_type());
          tmp_gen_col.set_accuracy(expr.get_accuracy());
        }
        tmp_gen_col.set_prev_column_id(UINT64_MAX);
        tmp_gen_col.set_next_column_id(UINT64_MAX);
        ObSEArray<ObRawExpr*, 4> dep_columns;
        if (OB_FAIL(ObRawExprUtils::extract_column_exprs(&expr, dep_columns))) {
          LOG_WARN("extract column exprs failed", K(ret), K(expr));
        }
        for (int64_t i = 0; OB_SUCC(ret) && i < dep_columns.count(); ++i) {
          const ObRawExpr *dep_column = dep_columns.at(i);
          if (OB_ISNULL(dep_column)) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("deps_column is null");
          } else if (!dep_column->is_column_ref_expr()) {
            ret = OB_ERR_UNEXPECTED;
            LOG_WARN("dep column is invalid", K(ret), KPC(dep_column));
          } else if (OB_FAIL(tmp_gen_col.add_cascaded_column_id(
              static_cast<const ObColumnRefRawExpr*>(dep_column)->get_column_id()))) {
            LOG_WARN("add cascaded column id failed", K(ret));
          }
        }

        if (OB_SUCC(ret) && OB_INVALID_ID != index_id) {
          if (index_id != gen_col->get_column_id()) {
            ret = OB_ERR_INVALID_COLUMN_ID;
            LOG_WARN("column id specified by create index mismatch with column id", K(ret), K(gen_col), K(index_id));
          } else {
            ObColumnSchemaV2 *tmp_col = data_schema.get_column_schema(index_id);
            if (is_invalid || nullptr != tmp_col) {
              ret = OB_ERR_INVALID_COLUMN_ID;
              LOG_WARN("invalid id", K(ret));
            } else {
              tmp_gen_col.set_column_id(index_id);
            }
          }
        }
        if (OB_FAIL(ret)) {
          //do nothing
        } else if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH, pos,
                                           "SYS_NC%ld$", /*naming rules are compatible with oracle*/
                                           tmp_gen_col.get_column_id()))) {
          LOG_WARN("print generate column prefix name failed", K(ret));
        } else if (OB_FAIL(tmp_gen_col.set_column_name(col_name_buf))) {
          LOG_WARN("set column name failed", K(ret));
        } else if (OB_FAIL(tmp_gen_col.set_orig_default_value(default_value))) {
          LOG_WARN("set orig default value failed", K(ret));
        } else if (OB_FAIL(tmp_gen_col.set_cur_default_value(
                       default_value,
                       tmp_gen_col.is_default_expr_v2_column()))) {
          LOG_WARN("set current default value failed", K(ret));
        } else if (OB_FAIL(data_schema.add_column(tmp_gen_col))) {
          LOG_WARN("add column schema to data table failed", K(ret));
        } else {
          gen_col = data_schema.get_column_schema(tmp_gen_col.get_column_id());
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::generate_prefix_column(
    const ObColumnSortItem &sort_item,
    const ObSQLMode sql_mode,
    ObSQLSessionInfo &session,
    ObTableSchema &data_schema,
    ObColumnSchemaV2 *&prefix_col)
{
  int ret = OB_SUCCESS;
  ObColumnSchemaV2 *old_column = NULL;
  ObColumnSchemaV2 prefix_column;
  char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
  uint64_t tenant_data_version = 0;
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], expr_def) {
    MEMSET(expr_def, 0, sizeof(expr_def));
    int64_t name_pos = 0;
    int64_t def_pos = 0;
    const uint64_t spec_id = sort_item.get_column_id();
    const bool is_invalid = (spec_id < OB_APP_MIN_COLUMN_ID || spec_id > OB_MIN_SHADOW_COLUMN_ID);
    if (OB_ISNULL(old_column = data_schema.get_column_schema(sort_item.column_name_))) {
      ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
      LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, sort_item.column_name_.length(), sort_item.column_name_.ptr());
    } else if (old_column->is_generated_column()) {
      ret = OB_NOT_SUPPORTED;
      LOG_USER_ERROR(OB_NOT_SUPPORTED, "prefix index on generated column");
      LOG_WARN("prefix index on generated column not supported", K(ret), KPC(old_column));
    } else if (OB_FAIL(sql::ObDDLResolver::check_prefix_key(sort_item.prefix_len_, *old_column))) {
      LOG_WARN("Incorrect prefix key", K(sort_item), K(ret));
    } else if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH, name_pos, "__substr%d_%ld",
                                       sort_item.prefix_len_, old_column->get_column_id()))) {
      LOG_WARN("print generate column prefix name failed", K(ret));
    } else if ((prefix_col = data_schema.get_column_schema(ObString(name_pos, col_name_buf))) != NULL) {
      //索引依赖的生成列已经存在，就引用该生成列，不重新创建了
      if (OB_INVALID_ID != spec_id) {
        // 只有备份恢复create index才会设置column id --> ObCreateIndexResolver::resolve_index_column_node
        // 该column id指定的是前缀索引生成列的column id
        // 可能出现多个前缀索引共用一个生成列, 基线备份时指定的生成列id必须一致
        if (spec_id != prefix_col->get_column_id()) {
          ret = OB_ERR_INVALID_COLUMN_ID;
          LOG_USER_ERROR(OB_ERR_INVALID_COLUMN_ID,
              sort_item.column_name_.length(), sort_item.column_name_.ptr());
          LOG_WARN("Column id specified by create prefix index mismatch with column schema id",
                   K(ret), K(spec_id), K(is_invalid), K(data_schema));
        }
      }
    } else if (OB_FAIL(databuff_printf(expr_def, OB_MAX_DEFAULT_VALUE_LENGTH, def_pos, "SUBSTR(`%s`, 1, %d)",
                                       old_column->get_column_name(), sort_item.prefix_len_))) {
      LOG_WARN("print generate expr definition prefix failed", K(ret));
    } else {
      ObObj default_value;
      default_value.set_varchar(expr_def, static_cast<int32_t>(def_pos));
      if (OB_FAIL(prefix_column.assign(*old_column))) {
        LOG_WARN("fail to assign column", KR(ret), KPC(old_column));
      } else if (FALSE_IT(prefix_column.del_column_flag(HEAP_ALTER_ROWKEY_FLAG))) { // clear flag
      } else if (FALSE_IT(prefix_column.del_column_flag(HEAP_TABLE_CLUSTERING_KEY_FLAG))) {
      } else if (!prefix_column.is_valid()) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("prefix column is invalid", K(ret));
      } else if (OB_FAIL(prefix_column.set_column_name(col_name_buf))) {
        LOG_WARN("set column name failed", K(ret));
      } else if (OB_FAIL(prefix_column.set_orig_default_value(default_value))) {
        LOG_WARN("set orig default value failed", K(ret));
      } else if (OB_FAIL(prefix_column.set_cur_default_value(
                     default_value,
                     prefix_column.is_default_expr_v2_column()))) {
        LOG_WARN("set current default value to prefix column failed", K(ret));
      } else if (OB_FAIL(prefix_column.add_cascaded_column_id(old_column->get_column_id()))) {
        LOG_WARN("add cascaded column id failed", K(ret));
      } else {
        if (ob_is_text_tc(prefix_column.get_data_type())) {
          prefix_column.set_data_type(ObVarcharType);
          prefix_column.set_data_scale(0);
        }
        // data_schema 为 not null时，会拦截掉null写入。 所以不需要设置前缀列的not null属性。
        prefix_column.set_nullable(true);
        prefix_column.drop_not_null_cst();
        prefix_column.set_rowkey_position(0); //非主键列
        prefix_column.set_index_position(0); //非索引列
        prefix_column.set_tbl_part_key_pos(0); //非partition key
        //前缀索引的生成列的长度应该是依赖列和前缀长度的最小值
        int32_t data_len = static_cast<int32_t>(min(sort_item.prefix_len_, old_column->get_data_length()));
        prefix_column.set_data_length(data_len);
        prefix_column.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
        // Clear skip index attribute for virtual generated columns to avoid validation errors
        prefix_column.set_skip_index_attr(0);
        if (is_pad_char_to_full_length(sql_mode)) {
          prefix_column.add_column_flag(PAD_WHEN_CALC_GENERATED_COLUMN_FLAG);
        }
        if (old_column->is_generated_column_using_udf()) {
          prefix_column.add_column_flag(GENERATED_COLUMN_UDF_EXPR);
        }
        prefix_column.set_is_hidden(true); //for debug
        old_column->add_column_flag(GENERATED_DEPS_CASCADE_FLAG);
        prefix_column.set_prev_column_id(UINT64_MAX);
        prefix_column.set_next_column_id(UINT64_MAX);
        if (OB_INVALID_ID != spec_id) {
          // 只有备份恢复create index才会设置column id --> ObCreateIndexResolver::resolve_index_column_node
          // 该column id指定的是前缀索引生成列的column id
          ObColumnSchemaV2 *tmp_col = data_schema.get_column_schema(spec_id);
          if (is_invalid || NULL != tmp_col) {
          				ret = OB_ERR_INVALID_COLUMN_ID;
          				LOG_USER_ERROR(OB_ERR_INVALID_COLUMN_ID,
                sort_item.column_name_.length(), sort_item.column_name_.ptr());
          				LOG_WARN("Column id specified by create prefix index mismatch with column schema id",
          								 K(ret), K(spec_id), K(is_invalid), K(data_schema));
          } else {
            prefix_column.set_column_id(spec_id);
          }
        } else {
          prefix_column.set_column_id(data_schema.get_max_used_column_id() + 1);
        }
        if (OB_FAIL(ret)) {
        } else if (OB_FAIL(data_schema.add_column(prefix_column))) {
          LOG_WARN("add column to data schema failed", K(ret));
        } else {
          prefix_col = data_schema.get_column_schema(prefix_column.get_column_id());
        }
      }
      if (OB_SUCC(ret)) {
        if (OB_FAIL(GET_MIN_DATA_VERSION(data_schema.get_tenant_id(), tenant_data_version))) {
          LOG_WARN("get tenant data version failed", K(ret));
        } else if (tenant_data_version < DATA_VERSION_4_2_2_0) {
          //do nothing
        } else if (OB_FAIL(ObRawExprUtils::extract_local_vars_for_prefix_gen_col(*old_column, session, *prefix_col))) {
          LOG_WARN("fail to extract sysvar from expr", K(ret));
        }
      }
    }
  }
  return ret;
}

int ObIndexBuilderUtil::adjust_spatial_args(
    ObCreateIndexArg &arg,
    ObTableSchema &data_schema,
    ObIAllocator &allocator,
    ObIArray<ObColumnSchemaV2*> &spatial_cols)
{
  int ret = OB_SUCCESS;
  // spatial index needs to create two columns with type of uint64_t, varchar respectively
  // and add index on them
  ObIArray<ObColumnSortItem> &sort_items = arg.index_columns_;
  ObArray<ObColumnSortItem> new_sort_items;
  ObColumnSortItem cellid_sort_item;
  ObColumnSortItem mbr_sort_item;
  if (OB_UNLIKELY(sort_items.empty())) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("get invalid arguments", K(ret));
  } else if (OB_FAIL(generate_spatial_columns(sort_items.at(0).column_name_, data_schema, spatial_cols))) {
    if (lib::is_oracle_mode() && ret == OB_ERR_COLUMN_DUPLICATE) { // error code adaptation
      ret = OB_ERR_DOMAIN_COLUMN_DUPLICATE;
      LOG_WARN("cannot create multiple domain indexes on a column list using same", K(ret));
    } else {
      LOG_WARN("generate spatial column failed", K(ret));
    }
  } else if (OB_UNLIKELY(spatial_cols.count() != 2) ||
             OB_ISNULL(spatial_cols.at(0)) || OB_ISNULL(spatial_cols.at(1))) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("get invalid spatial cols", K(ret), K(spatial_cols.count()));
  } else if (OB_FAIL(ob_write_string(allocator, spatial_cols.at(0)->get_column_name_str(),
                                     cellid_sort_item.column_name_))) {
    LOG_WARN("failed to copy column name", K(ret));
  } else if (OB_FAIL(ob_write_string(allocator, spatial_cols.at(1)->get_column_name_str(),
                                     mbr_sort_item.column_name_))) {
    LOG_WARN("failed to copy column name", K(ret));
  } else if (OB_FAIL(new_sort_items.push_back(cellid_sort_item))) {
    LOG_WARN("push back cellid sort item to new sort items failed", K(ret));
  } else if (OB_FAIL(new_sort_items.push_back(mbr_sort_item))) {
    LOG_WARN("push back mbr sort item to new sort items failed", K(ret));
  } else {
    sort_items.reset();
    if (OB_FAIL(sort_items.assign(new_sort_items))) {
      LOG_WARN("assign sort items failed", K(ret));
    }
  }

  return ret;
}

int ObIndexBuilderUtil::generate_spatial_columns(
    const common::ObString &col_name,
    ObTableSchema &data_schema,
    ObIArray<ObColumnSchemaV2*> &spatial_cols)
{
  int ret = OB_SUCCESS;
  ObColumnSchemaV2 *col_schema = NULL;
  ObColumnSchemaV2 *cellid_col = NULL;
  ObColumnSchemaV2 *mbr_col = NULL;
  if (OB_ISNULL(col_schema = data_schema.get_column_schema(col_name))) {
    ret = OB_ERR_KEY_COLUMN_DOES_NOT_EXITS;
    LOG_USER_ERROR(OB_ERR_KEY_COLUMN_DOES_NOT_EXITS, col_name.length(), col_name.ptr());
  } else if (OB_FAIL(generate_spatial_cellid_column(*col_schema, data_schema, cellid_col))) {
    LOG_WARN("generate spatial cellid column failed", K(ret), K(col_name), K(*col_schema));
  } else if (OB_FAIL(generate_spatial_mbr_column(*col_schema, data_schema, mbr_col))) {
    LOG_WARN("generate spatial mbr column failed", K(ret), K(col_name), K(*col_schema));
  } else if (OB_FAIL(spatial_cols.push_back(cellid_col))) {
    LOG_WARN("push back spatial cellid column failed", K(ret), K(col_name), K(*cellid_col));
  } else if (OB_FAIL(spatial_cols.push_back(mbr_col))) {
    LOG_WARN("push back spatial mbr column failed", K(ret), K(col_name), K(*mbr_col));
  }
  return ret;
}

int ObIndexBuilderUtil::generate_spatial_cellid_column(
    ObColumnSchemaV2 &col_schema,
    ObTableSchema &data_schema,
    ObColumnSchemaV2 *&cellid_col)
{
  cellid_col = NULL;
  int ret = OB_SUCCESS;
  ObColumnSchemaV2 column_schema;
  char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], cellid_expr_def) {
    MEMSET(cellid_expr_def, 0, sizeof(cellid_expr_def));
    int64_t name_pos = 0;
    int64_t def_pos = 0;
    uint64_t geo_col_id = col_schema.get_column_id();
    if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH,
        name_pos, "__cellid"))) {
      LOG_WARN("print __cellid to col_name_buf failed", K(ret));
    } else if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH,
        name_pos, "_%ld", geo_col_id))) {
      LOG_WARN("print column id to col_name_buf failed", K(ret), K(geo_col_id));
    } else if (lib::is_mysql_mode() && OB_FAIL(databuff_printf(cellid_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH,
        def_pos, "SPATIAL_CELLID(`%s`)", col_schema.get_column_name()))) {
      LOG_WARN("print cellid expr to cellid_expr_def failed", K(ret), K(col_schema.get_column_name()));
    } else if (lib::is_oracle_mode() && OB_FAIL(databuff_printf(cellid_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH,
        def_pos, "SPATIAL_CELLID(%s)", col_schema.get_column_name()))) {
      LOG_WARN("print cellid expr to cellid_expr_def failed", K(ret), K(col_schema.get_column_name()));
    } else if (OB_FAIL(column_schema.add_cascaded_column_id(geo_col_id))) {
      LOG_WARN("add cascaded column to generated column failed", K(ret));
    } else {
      col_schema.add_column_flag(GENERATED_DEPS_CASCADE_FLAG);
      ObObj default_value;
      default_value.set_varchar(cellid_expr_def, static_cast<int32_t>(def_pos));
      column_schema.set_rowkey_position(0); //非主键列
      column_schema.set_index_position(0); //非索引列
      column_schema.set_tbl_part_key_pos(0); //非partition key
      column_schema.set_tenant_id(data_schema.get_tenant_id());
      column_schema.set_table_id(data_schema.get_table_id());
      column_schema.set_column_id(data_schema.get_max_used_column_id() + 1);
      column_schema.add_column_flag(SPATIAL_INDEX_GENERATED_COLUMN_FLAG);
      column_schema.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
      column_schema.set_is_hidden(true);
      column_schema.set_data_type(ObUInt64Type);
      column_schema.set_collation_type(col_schema.get_collation_type());
      column_schema.set_prev_column_id(UINT64_MAX);
      column_schema.set_next_column_id(UINT64_MAX);
      column_schema.set_geo_col_id(geo_col_id);
      if (OB_FAIL(column_schema.set_column_name(col_name_buf))) {
        LOG_WARN("set column name failed", K(ret));
      } else if (OB_FAIL(column_schema.set_orig_default_value(default_value))) {
        LOG_WARN("set orig default value failed", K(ret));
      } else if (OB_FAIL(column_schema.set_cur_default_value(
                     default_value,
                     column_schema.is_default_expr_v2_column()))) {
        LOG_WARN("set current default value failed", K(ret));
      } else if (OB_FAIL(data_schema.add_column(column_schema))) {
        LOG_WARN("add cellid column schema to data table failed", K(ret));
      } else {
        cellid_col = data_schema.get_column_schema(column_schema.get_column_id());
      }
    }
  }

  return ret;
}

int ObIndexBuilderUtil::generate_spatial_mbr_column(
    ObColumnSchemaV2 &col_schema,
    ObTableSchema &data_schema,
    ObColumnSchemaV2 *&mbr_col)
{
  mbr_col = NULL;
  int ret = OB_SUCCESS;
  ObColumnSchemaV2 column_schema;
  char col_name_buf[OB_MAX_COLUMN_NAMES_LENGTH] = {'\0'};
  SMART_VAR(char[OB_MAX_DEFAULT_VALUE_LENGTH], mbr_expr_def) {
    MEMSET(mbr_expr_def, 0, sizeof(mbr_expr_def));
    int64_t name_pos = 0;
    int64_t def_pos = 0;
    uint64_t geo_col_id = col_schema.get_column_id();
    if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH,
        name_pos, "__mbr"))) {
      LOG_WARN("print generate column mbr name failed", K(ret));
    } else if (OB_FAIL(databuff_printf(col_name_buf, OB_MAX_COLUMN_NAMES_LENGTH,
        name_pos, "_%ld", geo_col_id))) {
      LOG_WARN("print column id to buffer failed", K(ret), K(geo_col_id));
    } else if (lib::is_mysql_mode() && OB_FAIL(databuff_printf(mbr_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH,
        def_pos, "SPATIAL_MBR(`%s`)", col_schema.get_column_name()))) {
      LOG_WARN("print mbr expr to mbr_expr_def failed", K(ret), K(col_schema.get_column_name()));
    } else if (lib::is_oracle_mode() && OB_FAIL(databuff_printf(mbr_expr_def, OB_MAX_DEFAULT_VALUE_LENGTH,
        def_pos, "SPATIAL_MBR(%s)", col_schema.get_column_name()))) {
      LOG_WARN("print mbr expr to mbr_expr_def failed", K(ret), K(col_schema.get_column_name()));
    } else if (OB_FAIL(column_schema.add_cascaded_column_id(geo_col_id))) {
      LOG_WARN("add cascaded column to generated column failed", K(ret));
    } else {
      col_schema.add_column_flag(GENERATED_DEPS_CASCADE_FLAG);
      ObObj default_value;
      default_value.set_varchar(mbr_expr_def, static_cast<int32_t>(def_pos));
      column_schema.set_rowkey_position(0); //非主键列
      column_schema.set_index_position(0); //非索引列
      column_schema.set_tbl_part_key_pos(0); //非partition key
      column_schema.set_tenant_id(data_schema.get_tenant_id());
      column_schema.set_table_id(data_schema.get_table_id());
      column_schema.set_column_id(data_schema.get_max_used_column_id() + 1);
      column_schema.add_column_flag(SPATIAL_INDEX_GENERATED_COLUMN_FLAG);
      column_schema.add_column_flag(VIRTUAL_GENERATED_COLUMN_FLAG);
      column_schema.set_is_hidden(true);
      column_schema.set_data_type(ObVarcharType);
      column_schema.set_data_length(OB_DEFAULT_MBR_SIZE);
      column_schema.set_collation_type(col_schema.get_collation_type());
      column_schema.set_prev_column_id(UINT64_MAX);
      column_schema.set_next_column_id(UINT64_MAX);
      column_schema.set_geo_col_id(geo_col_id);
      if (OB_FAIL(column_schema.set_column_name(col_name_buf))) {
        LOG_WARN("set column name failed", K(ret));
      } else if (OB_FAIL(column_schema.set_orig_default_value(default_value))) {
        LOG_WARN("set orig default value failed", K(ret));
      } else if (OB_FAIL(column_schema.set_cur_default_value(
                     default_value,
                     column_schema.is_default_expr_v2_column()))) {
        LOG_WARN("set current default value failed", K(ret));
      } else if (OB_FAIL(data_schema.add_column(column_schema))) {
        LOG_WARN("add mbr column schema to data table failed", K(ret));
      } else {
        mbr_col = data_schema.get_column_schema(column_schema.get_column_id());
      }
    }
  }

  return ret;
}
int ObIndexBuilderUtil::check_index_for_if_not_exist(const uint64_t tenant_id, const uint64_t index_id, int64_t &task_id)
{
  int ret = OB_SUCCESS;
  task_id = 0;
  if (OB_ISNULL(GCTX.sql_proxy_)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("sql proxy is null", KR(ret));
  } else if (OB_INVALID_ID == index_id) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("index id invalid", KR(ret), K(index_id));
  } else {
    sqlclient::ObMySQLResult *result = NULL;
    ObSqlString sql;
    if (OB_FAIL(sql.append_fmt("SELECT (SELECT index_status FROM %s where tenant_id = %lu and table_id = %lu) as index_status,"
                                      "(SELECT task_id FROM %s where target_object_id = %lu) as task_id",
                                       OB_ALL_TABLE_TNAME, OB_INVALID_TENANT_ID, index_id, OB_ALL_DDL_TASK_STATUS_TNAME, index_id))) {
      LOG_WARN("fail to append fmt", KR(ret));
    } else {
      ObIndexStatus index_status = INDEX_STATUS_MAX;
      SMART_VAR(ObMySQLProxy::MySQLResult, res) {
        if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) {
          LOG_WARN("fail to read sql", KR(ret), K(tenant_id));
        } else if (OB_ISNULL(result = res.get_result())) {
          ret = OB_ERR_UNEXPECTED;
          LOG_WARN("result is null", KR(ret), K(tenant_id));
        } else if (OB_FAIL(result->next())) {
          if (OB_ITER_END == ret) {
            ret = OB_TABLE_NOT_EXIST;
          } else {
            LOG_WARN("fail to get next", KR(ret));
          }
        } else {
          EXTRACT_INT_FIELD_MYSQL(*result, "index_status", index_status, ObIndexStatus);
          EXTRACT_INT_FIELD_MYSQL_WITH_DEFAULT_VALUE(*result, "task_id", task_id,
                                                     int64_t, true /*skip null error*/,
                                                     false /*skip column error*/, -1);
          if (OB_SUCC(ret)) {
            if (ObIndexStatus::INDEX_STATUS_AVAILABLE == index_status) {
              // do nothing
            } else if (-1 == task_id) {
              ret = OB_ERR_UNEXPECTED;
              LOG_WARN("index status not available but ddl task not exist not expected", KR(ret), K(tenant_id), K(index_id));
            }
          }
        }
      } // end smart var
    }
  }
  return ret;
}

}//end namespace rootserver
}//end namespace oceanbase
